package com.danan.data_collector.util;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializeConfig;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.fastjson.serializer.ValueFilter;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import io.debezium.data.SpecialValueDecimal;
import io.debezium.data.VariableScaleDecimal;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;

public class MyOracleCdcDeserialization implements DebeziumDeserializationSchema<String> {

    /**
     * {
     * "database":"",
     * "tableName":"",
     * "after":{"id":"1001","name":"zs"...},
     * "before":{"id":"1001","name":"zs"...},
     * "type":"insert"
     * }
     */
    private static String timeZoneId = "Asia/Shanghai";

    public static final ValueFilter FILTER = new ValueFilter() {
        @Override
        public Object process(Object obj, String s, Object v) {
            if(v==null)
                return "";
            return v;
        }
    };

    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
        //1.创建JSONObject对象用来存放最终结果
        JSONObject result = new JSONObject();

        //TODO 获取数据库&表名
        String topic = sourceRecord.topic();
        String[] split = topic.split("\\.");
        String database = split[1];
        String tableName = split[2];

        //TODO 获取before&after数据
        Struct value = (Struct) sourceRecord.value();

        //TODO after
        Struct after = value.getStruct("after");
        JSONObject afterJSON = new JSONObject();
        //判断是否有after数据
        if (after != null) {
            Schema schema = after.schema();
            List<Field> fields = schema.fields();
            for (Field filed : fields) {
                String fieldTypeName = filed.schema().type().getName();
                String fieldSchemaName = filed.schema().name();
                Object data = after.get(filed);
                if ("int64".equals(fieldTypeName) && "io.debezium.time.Timestamp".equals(fieldSchemaName) && data != null) {
                    afterJSON.put(filed.name(), String.format("to_date('%s','yyyy-mm-dd hh24:mi:ss')",DateFormatUtil.toYmdHms(Long.parseLong(data.toString()) - 8 * 60 * 60 * 1000)));
                } else if ("string".equals(fieldTypeName) && "io.debezium.time.ZonedTimestamp".equals(fieldSchemaName) && data != null) {
                    afterJSON.put(filed.name(), String.format("to_date('%s','yyyy-mm-dd hh24:mi:ss')",DateFormatUtil.toYmdHms(data.toString())));
                } else if ("int32".equals(fieldTypeName) && "io.debezium.time.Date".equals(fieldSchemaName) && data != null) {
                    int day = (int) data;
                    //System.out.println(day);
                    long sceond = day * 24 * 60 * 60L * 1000;
                    String dateStr = DateFormatUtil.toDate(sceond);
                    afterJSON.put(filed.name(), String.format("to_date('%s','yyyy-mm-dd')",dateStr));
                } else if("struct".equals(fieldTypeName) && "io.debezium.data.VariableScaleDecimal".equals(fieldSchemaName) && data != null){
                    SpecialValueDecimal svd = VariableScaleDecimal.toLogical((Struct) data);
                    Optional<BigDecimal> decimalValue = svd.getDecimalValue();
                    decimalValue.ifPresent(bigDecimal -> afterJSON.put(filed.name(), String.format("'%s'",bigDecimal)));
                } else {
                    afterJSON.put(filed.name(), data == null ? "null" : String.format("'%s'",data));
                }
            }
        }

        //TODO before
        Struct before = value.getStruct("before");
        JSONObject beforeJSON = new JSONObject();
        //判断是否有before数据
        if (before != null) {
            Schema schema = before.schema();
            List<Field> fields = schema.fields();
            for (Field filed : fields) {
                String fieldTypeName = filed.schema().type().getName();
                String fieldSchemaName = filed.schema().name();
                Object data = before.get(filed);
                if ("int64".equals(fieldTypeName) && "io.debezium.time.Timestamp".equals(fieldSchemaName) && data != null) {
                    beforeJSON.put(filed.name(), String.format("to_date('%s','yyyy-mm-dd hh24:mi:ss')",DateFormatUtil.toYmdHms(Long.parseLong(data.toString()) - 8 * 60 * 60 * 1000)));
                } else if ("string".equals(fieldTypeName) && "io.debezium.time.ZonedTimestamp".equals(fieldSchemaName) && data != null) {
                    beforeJSON.put(filed.name(), String.format("to_date('%s','yyyy-mm-dd hh24:mi:ss')",DateFormatUtil.toYmdHms(data.toString())));
                } else if ("int32".equals(fieldTypeName) && "io.debezium.time.Date".equals(fieldSchemaName) && data != null) {
                    int day = (int) data;
                    //System.out.println(day);
                    long sceond = day * 24 * 60 * 60L * 1000;
                    String dateStr = DateFormatUtil.toDate(sceond);
                    beforeJSON.put(filed.name(), String.format("to_date('%s','yyyy-mm-dd')",dateStr));
                } else if("struct".equals(fieldTypeName) && "io.debezium.data.VariableScaleDecimal".equals(fieldSchemaName) && data != null){
                    SpecialValueDecimal svd = VariableScaleDecimal.toLogical((Struct) data);
                    Optional<BigDecimal> decimalValue = svd.getDecimalValue();
                    decimalValue.ifPresent(bigDecimal -> beforeJSON.put(filed.name(), String.format("'%s'",bigDecimal)));
                } else {
                    beforeJSON.put(filed.name(), data == null ? "null" : String.format("'%s'",data));
                }
            }
        }

        //TODO 获取操作类型 DELETE UPDATE CREATE
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        String type = operation.toString().toLowerCase();
        switch (type){
            case "read":
                type = "r";
                break;
            case "update":
                type = "u";
                break;
            case "create":
                type = "c";
                break;
            case "delete":
                type = "d";
                break;
        }

        result.put("schema", database);
        result.put("table", tableName);
        result.put("after", afterJSON);
        result.put("before", beforeJSON);
        result.put("type", type);

        collector.collect(JSON.toJSONString(result, SerializerFeature.WriteSlashAsSpecial));
    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }


}